#!/usr/bin/python
#-*- coding: utf-8 -*-
# 抽取kafka数据到redis_mq模块
# 作者：王成
# 日期：2017-04-14
import MySQLdb
import redis
import requests
import json
import yaml,logging
from logging.handlers import TimedRotatingFileHandler,RotatingFileHandler
import time,timeit,datetime,sys,os,threading
import Queue
from daemon import Daemon
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import pickle
reload(sys)
sys.setdefaultencoding('utf-8')

class MyDaemon(Daemon):
    def execute_sql(self,sql,action='select'):
        try:
            if self.db is None:
                self.db = MySQLdb.connect(host=self.config['mysql_web']['host'],port=self.config['mysql_web']['port'], user=self.config['mysql_web']['user'], passwd=self.config['mysql_web']['password'], db=self.config['mysql_web']['database'],charset="utf8")
            try:
                self.db.ping()
            except MySQLdb.Error,e:
                self.db = MySQLdb.connect(host=self.config['mysql_web']['host'],port=self.config['mysql_web']['port'], user=self.config['mysql_web']['user'], passwd=self.config['mysql_web']['password'], db=self.config['mysql_web']['database'],charset="utf8")
       
            mysql_web = self.db.cursor(MySQLdb.cursors.DictCursor)
            r = mysql_web.execute(sql)
            if action=='select':
                r = mysql_web.fetchall()
            elif action=='update':
                pass
            elif action=='insert':
                r = self.db.insert_id()
            mysql_web.close()
            return r
        except Exception, e:
            logging.exception('连接数据库时错误: %s', str(e))
            r = None
            if action=='select':
                r = []
            elif action=='update':
                pass
            elif action=='insert':
                r = 0
            return r
   
    def format_msg(self,msg):
        try:
            row = {}
            if len(msg['plateNumber'])<7 or msg['plateNumber']=='未识别' or msg['plateNumber']=='无' or msg['plateNumber']=='无车牌':
                row['license_plate'] = '无牌'
            else:
                row['license_plate'] = msg['plateNumber']#.decode("gbk").encode('utf-8')
            row['plate_type_id'] = msg['plateType']
            row['location_id'] = msg['locationId']
            row['loc_id'] = msg['locationId']
            row['region_id'] = '420100'
            if self.location2yisa_region.has_key(row['location_id']):
                row['region_id'] = self.location2yisa_region[row['location_id']]
            #s.encode('utf-8', "ignore")
            #s.decode('unicode_escape').encode('utf-8')
            #row['location_name'] = tmp[8].decode('unicode_escape').encode('utf-8')
            #row['location_name'] = tmp[8].encode('utf-8', "ignore")
            #row['location_name'] = unicode(tmp[8], errors='ignore')
            #print row['location_name'],tmp[8]
            #sys.exit()
            #row['capture_type_id'] = tmp[9]
            row['device_id'] = msg['deviceId']
            row['dev_id'] = msg['deviceId']
            row['lane_id'] = 0
            row['speed'] = 0
            ct = list(msg['captureTime'])
            ct.insert(12,':')
            ct.insert(10,':')
            ct.insert(8,' ')
            ct.insert(6,'-')
            ct.insert(4,'-')
            row['capture_time'] = ''.join(ct)
            if row['capture_time'][8:10] != str(datetime.datetime.now().day):
                if self.err_dev_time.has_key(msg['deviceId']):
                    self.err_dev_time[msg['deviceId']]+=1
                else:
                    self.err_dev_time[msg['deviceId']] = 1
                if self.err_dev_time[msg['deviceId']]%1000 is 0:
                    print datetime.datetime.now(),'设备[%s]时间错误数据已达到：%d条' % (str(msg['deviceId']),self.err_dev_time[msg['deviceId']])
                    logging.warning('设备[%s]时间错误数据已达到：%d条',str(msg['deviceId']),self.err_dev_time[msg['deviceId']])
            row['direction_id'] = msg['directionId']
            row['image_url'] = msg['imageUrl']
            if row['image_url'].startswith("X:"):
                row['image_url'] = row['image_url'].replace("X:","http://isilon.whjgj.com/GCJL")
            row['plate_color'] = msg['plateColor']
            
            if row['location_id'] in self.location_to_yisa_location:
                row['location_id'] = self.location_to_yisa_location[row['location_id']]
            else:
                insert_id = self.execute_sql("INSERT INTO mon_location SET location_name = '%s',loc_id='%s',region_code='%s';" % ("",row['location_id'],row['region_id']),"insert")
                if insert_id > 0:
                    print insert_id,row['location_id']
                    r = requests.get('http://127.0.0.1:9002/?format=json&action=update_location_id')
                    r = requests.get('http://127.0.0.1:9002/?format=json&action=make_location_dict_cache')
                    self.load_cache()
                    if row['location_id'] in self.location_to_yisa_location:
                        row['location_id'] = self.location_to_yisa_location[row['location_id']]              
            if ord(row['device_id'][0])>128:
                #print msg
                self.err_count+=1
                if self.err_count%100 is 0:
                    print datetime.datetime.now(),'设备ID错误数据已达到：%d条' % self.err_count
                    logging.warning('设备ID错误数据已达到：%d条',self.err_count)
                row['device_id'] = '420100000000000000'#有汉字
            if row['device_id'] in self.device_to_yisa_device:
                row['device_id'] = self.device_to_yisa_device[row['device_id']]
            else:
                insert_id = self.execute_sql("INSERT INTO mon_device SET dev_id = '%s',loc_id='%s',region_code='%s';" % (row['device_id'],msg['locationId'],row['region_id']),"insert")
                if insert_id > 0:
                    print insert_id,row['device_id'],msg['locationId']
                    r = requests.get('http://127.0.0.1:9002/?format=json&action=update_device_id') 
                    r = requests.get('http://127.0.0.1:9002/?format=json&action=make_device_dict_cache')
                    self.load_cache()
                    if row['device_id'] in self.device_to_yisa_device:
                        row['device_id'] = self.device_to_yisa_device[row['device_id']] 
            if self.crop_config.has_key(row['device_id']):
                row['crop_x'] = self.crop_config[row['device_id']]['crop_x']
                row['crop_y'] = self.crop_config[row['device_id']]['crop_y']
                row['crop_w'] = self.crop_config[row['device_id']]['crop_w']
                row['crop_h'] = self.crop_config[row['device_id']]['crop_h']
            return row
        except Exception, e:
            logging.exception('格式化信息时错误: %s', str(e))
            return None
            
    def load_cache(self):
        self.device_to_yisa_device = {}#第三方设备ID<=>Yisa设备ID映射
        if os.path.isfile(os.path.dirname(os.path.abspath(__file__))+"/data/device_dict.dat"):
            with open(os.path.dirname(os.path.abspath(__file__))+"/data/device_dict.dat") as f:
                d_str = f.read()
                self.device_to_yisa_device = pickle.loads(d_str)        
        self.location_to_yisa_location = {}#第三方卡口ID<=>Yisa卡口ID映射
        if os.path.isfile(os.path.dirname(os.path.abspath(__file__))+"/data/location_dict.dat"):
            with open(os.path.dirname(os.path.abspath(__file__))+"/data/location_dict.dat") as f:
                d_str = f.read()
                self.location_to_yisa_location = pickle.loads(d_str)
                
        # self.yisa_device2yisa_location = {}#Yisa设备ID<=>Yisa卡口映射
        # if os.path.isfile(os.path.dirname(os.path.abspath(__file__))+"/data/device_to_location_dict.dat"):
            # with open(os.path.dirname(os.path.abspath(__file__))+"/data/device_to_location_dict.dat") as f:
                # d_str = f.read()
                # self.yisa_device2yisa_location = pickle.loads(d_str)  
        rows = self.execute_sql("SELECT loc_id,region_code FROM Sheet1","select")
        self.location2yisa_region = {}
        for row in rows:
            self.location2yisa_region[row['loc_id']] = row['region_code']#Yisa卡口<=>Yisa区域映射 
        
    def run(self):      
        config_file = open(os.path.dirname(os.path.abspath(__file__)) + '/config.yaml')
        self.config = yaml.safe_load(config_file)
        config_file.close()
        
        name = 'yisa_get_msg_kafka' 
        logging.basicConfig(level=logging.INFO) 
        handler = RotatingFileHandler('/var/log/%s.log' % name, maxBytes=134217728, backupCount=7)
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        logging.getLogger('').addHandler(handler)
        #-------------------同步输出到控制台-------------------
        # console = logging.StreamHandler()
        # console.setLevel(logging.INFO)
        # formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        # console.setFormatter(formatter)
        # logging.getLogger().addHandler(console)
        #-------------------------------------------------------   
        logging.warning('启动 [%s]', name)
        concurrency_lock=threading.BoundedSemaphore(value=self.config['yisa_get_msg']['thread_num'])                  
        self.db = None
        self.err_count = 0#统计设备id错误数
        self.err_dev_time = {}#统计设备时间错误数
        self.load_cache()
        self.crop_config = {
            "420100999999999":{"crop_x":0,"crop_y":0,"crop_w":0,"crop_h":0},
        }
        self.MSG_QUEEN = Queue.Queue(0)
        try:
            r = redis.StrictRedis(unix_socket_path=self.config['redis_mq']['unix_socket_path'])
            pipe = r.pipeline()
            logging.warning('创建连接Kafka...')
            kafka_brokers = "gpu1:9092,gpu2:9092,gpu3:9092,gpu4:9092,gpu5:9092,gpu6:9092,gpu7:9092,gpu8:9092,gpu9:9092,gpu10:9092,gpu11:9092,gpu12:9092"
            # 实例化消费者
            consumer = KafkaConsumer('langxin',bootstrap_servers=kafka_brokers, auto_offset_reset='latest')
            recv_number = 0
            start = timeit.default_timer()
            while 1:
                try:
                    for msg in consumer:
                        recv_number += 1
                        # 消息内容
                        message = msg.value
                        offset = msg.offset # kafka偏移量
                        if recv_number%5000==0:
                            logging.warning('offset:%d,recv:%d',offset,recv_number)   
                        #continue
                        row = json.loads(message)
                        msg = self.format_msg(row)
                        if msg:
                            try:
                                # print msg
                                # continue
                                pipe.incr(time.strftime("%Y%m%d",time.localtime(time.time()))+":LANGXIN")
                                pipe.rpush(self.config['redis_mq']['queue_key_name'], json.dumps(msg))#,ensure_ascii=False
                                if pipe.__len__()==1:
                                    start = timeit.default_timer()
                                #self.MSG_QUEEN.put(msg)
                            except UnicodeDecodeError,ude:
                                logging.error('编码json时错误: %s',msg['license_plate'])
                        if timeit.default_timer()-start>1 or pipe.__len__()>100:#1秒或100条入redis一次
                            pipe.execute()
                except KeyboardInterrupt:
                    logging.error('Ctrl+C,终止运行')
                    return
                except Exception, e:
                    logging.exception('读取kafka时错误: %s', str(e))
                    time.sleep(10)
        except Exception, e:
            logging.exception('取数据时错误: %s', str(e))
            sys.exit(0)
    
if __name__ == "__main__":
    daemon = MyDaemon('/var/run/yisa_get_msg_kafka.pid')
    #daemon.run()
    #sys.exit(0)
    if len(sys.argv) == 2:
        if 'start' == sys.argv[1]:
            daemon.start()
        elif 'stop' == sys.argv[1]:
            daemon.stop()
        elif 'restart' == sys.argv[1]:
            daemon.restart()
        else:
            print "Unknown command"
            sys.exit(2)
        sys.exit(0)
    else:
        print "usage: %s start|stop|restart" % sys.argv[0]
        sys.exit(2)
